// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.


#ifndef IMPALA_EXEC_HDFS_RCFILE_SCANNER_H
#define IMPALA_EXEC_HDFS_RCFILE_SCANNER_H

/// org.apache.hadoop.hive.ql.io.RCFile is the original RCFile implementation
/// and should be viewed as the canonical definition of this format. If
/// anything is unclear in this file you should consult the code in
/// org.apache.hadoop.hive.ql.io.RCFile.
//
/// The following is a pseudo-BNF grammar for RCFile. Comments are prefixed
/// with dashes:
//
/// rcfile ::=
///   <file-header>
///   <rcfile-rowgroup>+
//
/// file-header ::=
///   <file-version-header>
///   <file-key-class-name>              (only exists if version is seq6)
///   <file-value-class-name>            (only exists if version is seq6)
///   <file-is-compressed>
///   <file-is-block-compressed>         (only exists if version is seq6)
///   [<file-compression-codec-class>]
///   <file-header-metadata>
///   <file-sync-field>
//
/// -- The normative RCFile implementation included with Hive is actually
/// -- based on a modified version of Hadoop's SequenceFile code. Some
/// -- things which should have been modified were not, including the code
/// -- that writes out the file version header. Consequently, RCFile and
/// -- SequenceFile originally shared the same version header.  A newer
/// -- release has created a unique version string.
//
/// file-version-header ::= Byte[4] {'S', 'E', 'Q', 6}
///                     |   Byte[4] {'R', 'C', 'F', 1}
//
/// -- The name of the Java class responsible for reading the key buffer
/// -- component of the rowgroup.
//
/// file-key-class-name ::=
///   Text {"org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"}
//
/// -- The name of the Java class responsible for reading the value buffer
/// -- component of the rowgroup.
//
/// file-value-class-name ::=
///   Text {"org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"}
//
/// -- Boolean variable indicating whether or not the file uses compression
/// -- for the key and column buffer sections.
//
/// file-is-compressed ::= Byte[1]
//
/// -- A boolean field indicating whether or not the file is block compressed.
/// -- This field is *always* false. According to comments in the original
/// -- RCFile implementation this field was retained for backwards
/// -- compatability with the SequenceFile format.
//
/// file-is-block-compressed ::= Byte[1] {false}
//
/// -- The Java class name of the compression codec iff <file-is-compressed>
/// -- is true. The named class must implement
/// -- org.apache.hadoop.io.compress.CompressionCodec.
/// -- The expected value is org.apache.hadoop.io.compress.GzipCodec.
//
/// file-compression-codec-class ::= Text
//
/// -- A collection of key-value pairs defining metadata values for the
/// -- file. The Map is serialized using standard JDK serialization, i.e.
/// -- an Int corresponding to the number of key-value pairs, followed by
/// -- Text key and value pairs. The following metadata properties are
/// -- mandatory for all RCFiles:
/// --
/// -- hive.io.rcfile.column.number: the number of columns in the RCFile
//
/// file-header-metadata ::= Map<Text, Text>
//
/// -- A 16 byte marker that is generated by the writer. This marker appears
/// -- at regular intervals at the beginning of rowgroup-headers, and is
/// -- intended to enable readers to skip over corrupted rowgroups.
//
/// file-sync-hash ::= Byte[16]
//
/// -- Each row group is split into three sections: a header, a set of
/// -- key buffers, and a set of column buffers. The header section includes
/// -- an optional sync hash, information about the size of the row group, and
/// -- the total number of rows in the row group. Each key buffer
/// -- consists of run-length encoding data which is used to decode
/// -- the length and offsets of individual fields in the corresponding column
/// -- buffer.
//
/// rcfile-rowgroup ::=
///   <rowgroup-header>
///   <rowgroup-key-data>
///   <rowgroup-column-buffers>
//
/// rowgroup-header ::=
///   [<rowgroup-sync-marker>, <rowgroup-sync-hash>]
///   <rowgroup-record-length>
///   <rowgroup-key-length>
///   <rowgroup-compressed-key-length>
//
/// -- rowgroup-key-data is compressed if the column data is compressed.
/// rowgroup-key-data ::=
///   <rowgroup-num-rows>
///   <rowgroup-key-buffers>
//
/// -- An integer (always -1) signaling the beginning of a sync-hash
/// -- field.
//
/// rowgroup-sync-marker ::= Int
//
/// -- A 16 byte sync field. This must match the <file-sync-hash> value read
/// -- in the file header.
//
/// rowgroup-sync-hash ::= Byte[16]
//
/// -- The record-length is the sum of the number of bytes used to store
/// -- the key and column parts, i.e. it is the total length of the current
/// -- rowgroup.
//
/// rowgroup-record-length ::= Int
//
/// -- Total length in bytes of the rowgroup's key sections.
//
/// rowgroup-key-length ::= Int
//
/// -- Total compressed length in bytes of the rowgroup's key sections.
//
/// rowgroup-compressed-key-length ::= Int
//
/// -- Number of rows in the current rowgroup.
//
/// rowgroup-num-rows ::= VInt
//
/// -- One or more column key buffers corresponding to each column
/// -- in the RCFile.
//
/// rowgroup-key-buffers ::= <rowgroup-key-buffer>+
//
/// -- Data in each column buffer is stored using a run-length
/// -- encoding scheme that is intended to reduce the cost of
/// -- repeated column field values. This mechanism is described
/// -- in more detail in the following entries.
//
/// rowgroup-key-buffer ::=
///   <column-buffer-length>
///   <column-buffer-uncompressed-length>
///   <column-key-buffer-length>
///   <column-key-buffer>
//
/// -- The serialized length on disk of the corresponding column buffer.
//
/// column-buffer-length ::= VInt
//
/// -- The uncompressed length of the corresponding column buffer. This
/// -- is equivalent to column-buffer-length if the RCFile is not compressed.
//
/// column-buffer-uncompressed-length ::= VInt
//
/// -- The length in bytes of the current column key buffer
//
/// column-key-buffer-length ::= VInt
//
/// -- The column-key-buffer contains a sequence of serialized VInt values
/// -- corresponding to the byte lengths of the serialized column fields
/// -- in the corresponding rowgroup-column-buffer. For example, consider
/// -- an integer column that contains the consecutive values 1, 2, 3, 44.
/// -- The RCFile format stores these values as strings in the column buffer,
/// -- e.g. "12344". The length of each column field is recorded in
/// -- the column-key-buffer as a sequence of VInts: 1,1,1,2. However,
/// -- if the same length occurs repeatedly, then we replace repeated
/// -- run lengths with the complement (i.e. negative) of the number of
/// -- repetitions, so 1,1,1,2 becomes 1,~2,2.
//
/// column-key-buffer ::= Byte[column-key-buffer-length]
//
/// rowgroup-column-buffers ::= <rowgroup-value-buffer>+
//
/// -- RCFile stores all column data as strings regardless of the
/// -- underlying column type. The strings are neither length-prefixed or
/// -- null-terminated, and decoding them into individual fields requires
/// -- the use of the run-length information contained in the corresponding
/// -- column-key-buffer.
//
/// rowgroup-column-buffer ::= Byte[column-buffer-length]
//
/// Byte ::= An eight-bit byte
//
/// VInt ::= Variable length integer. The high-order bit of each byte
/// indicates whether more bytes remain to be read. The low-order seven
/// bits are appended as increasingly more significant bits in the
/// resulting integer value.
//
/// Int ::= A four-byte integer in big-endian format.
//
/// Text ::= VInt, Chars (Length prefixed UTF-8 characters)
//
/// The above file format is read in chunks.  The "key" buffer is read. The "keys"
/// are really the lengths of the column data blocks and the lengths of the values
/// within those blocks.  Using this information the column "buffers" (data)
/// that are needed by the query are read into a single buffer.  Column data that
/// is not used by the query is skipped and not read from the file.  The key data
/// and the column data may be compressed.  The key data is compressed in a single
/// block while the column data is compressed separately by column.

#include "exec/base-sequence-scanner.h"

namespace impala {

struct HdfsFileDesc;
class HdfsScanNodeBase;
class TupleDescriptor;
class Tuple;

/// A scanner for reading RCFiles into tuples.
class HdfsRCFileScanner : public BaseSequenceScanner {
 public:
  HdfsRCFileScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
  virtual ~HdfsRCFileScanner();

  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;

  void DebugString(int indentation_level, std::stringstream* out) const;

 private:
  /// The key class name located in the RCFile Header.
  /// This is always "org.apache.hadoop.hive.ql.io.RCFile$KeyBuffer"
  static const char* const RCFILE_KEY_CLASS_NAME;

  /// The value class name located in the RCFile Header.
  /// This is always "org.apache.hadoop.hive.ql.io.RCFile$ValueBuffer"
  static const char* const RCFILE_VALUE_CLASS_NAME;

  /// RCFile metadata key for determining the number of columns
  /// present in the RCFile: "hive.io.rcfile.column.number"
  static const char* const RCFILE_METADATA_KEY_NUM_COLS;

  /// The four byte RCFile unique version header present at the beginning
  /// of the file {'R', 'C', 'F' 1}
  static const uint8_t RCFILE_VERSION_HEADER[4];

  // Check max column limit
  static const int MAX_NCOLS;

  /// Implementation of superclass functions.
  virtual FileHeader* AllocateFileHeader();
  virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
  virtual Status InitNewRange() WARN_UNUSED_RESULT;
  virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT;

  virtual THdfsFileFormat::type file_format() const { return THdfsFileFormat::RC_FILE; }

  /// Reads the RCFile Header Metadata section in the current file to determine the number
  /// of columns.  Other pieces of the metadata are ignored.
  Status ReadNumColumnsMetadata() WARN_UNUSED_RESULT;

  /// Reads the rowgroup header starting after the sync.
  /// Sets:
  ///   key_length_
  ///   compressed_key_length_
  ///   num_rows_
  Status ReadRowGroupHeader() WARN_UNUSED_RESULT;

  /// Read the rowgroup key buffers, decompress if necessary.
  /// The "keys" are really the lengths for the column values.  They
  /// are read here and then used to decode the values in the column buffer.
  /// Calls GetCurrentKeyBuffer for each column to process the key data.
  Status ReadKeyBuffers() WARN_UNUSED_RESULT;

  /// Process the current key buffer.
  /// Inputs:
  ///   col_idx: column to process
  ///   skip_col_data: if true, just skip over the key data.
  /// Input/Output:
  ///   key_buf_ptr: Pointer to the buffered file data, this will be moved
  ///                past the data for this column.
  ///   buf_len: Length of the buffer that will be read.
  /// Sets:
  ///   col_buf_len_
  ///   col_buf_uncompressed_len_
  ///   col_key_bufs_
  ///   col_bufs_off_
  Status GetCurrentKeyBuffer(
      int col_idx, bool skip_col_data, uint8_t** key_buf_ptr, int buf_length);

  /// Read the rowgroup column buffers
  /// Sets:
  ///   column_buffer_: Fills the buffer with either file data or decompressed data.
  Status ReadColumnBuffers() WARN_UNUSED_RESULT;

  /// Look at the next field in the specified column buffer
  /// Input:
  ///   col_idx: Column of the field.
  /// Modifies:
  ///   cur_field_length_rep_[col_idx]
  ///   key_buf_pos_[col_idx]
  ///   cur_field_length_rep_[col_idx]
  ///   cur_field_length_[col_idx]
  Status NextField(int col_idx) WARN_UNUSED_RESULT;

  /// Read a row group (except for the sync marker and sync) into buffers.
  /// Calls:
  ///   ReadRowGroupHeader()
  ///   ReadKeyBuffers()
  ///   ReadColumnBuffers()
  Status StartRowGroup() WARN_UNUSED_RESULT;

  /// Move to next row. Calls NextField on each column that we are reading.
  /// Modifies:
  ///   row_pos_
  Status NextRow() WARN_UNUSED_RESULT;

  /// Error message printed on a formatted stream when a bad column idex is encountered.
  /// Input:
  ///   col_idx: column to print.
  /// Output:
  ///   Error status, with formatted stream.
  Status BadColumnInfo(int col_idx);

  enum Version {
    SEQ6,     // Version for sequence file and pre hive-0.9 rc files
    RCF1      // The version post hive-0.9 which uses a new header
  };

  /// Data that is fixed across headers.  This struct is shared between scan ranges.
  struct RcFileHeader : public BaseSequenceScanner::FileHeader {
    /// RC file version
    Version version;

    /// The number of columns in the file (may be more than the number of columns in the
    /// table metadata)
    int num_cols;
  };

  /// Struct encapsulating all the state for parsing a single column from a row
  /// group
  struct ColumnInfo {
    /// If true, this column should be materialized, otherwise, it can be skipped
    bool materialize_column;

    /// Uncompressed and compressed byte lengths for this column
    int32_t buffer_len;
    int32_t uncompressed_buffer_len;

    /// Length and start of the key for this column.
    int32_t key_buffer_len;
    /// This is a ptr into the scanner's key_buffer_ for this column.
    uint8_t* key_buffer;
    /// Length of the key buffer
    int32_t buf_length;

    /// Current position in the key buffer
    int32_t key_buffer_pos;

    /// Offset into row_group_buffer_ for the start of this column.
    int32_t start_offset;

    /// Offset from the start of the column for the next field in the column
    int32_t buffer_pos;

    /// RLE: Length of the current field
    int32_t current_field_len;
    /// RLE: Repetition count of the current field
    int32_t current_field_len_rep;
  };

  /// Vector of column descriptions for each column in the file (i.e., may contain a
  /// different number of non-partition columns than are in the table metadata).  Indexed
  /// by column index, including non-materialized columns.
  std::vector<ColumnInfo> columns_;

  /// Buffer for copying key buffers.  This buffer is reused between row groups.
  std::vector<uint8_t> key_buffer_;

  /// number of rows in this rowgroup object
  int num_rows_ = 0;

  /// Current row position in this rowgroup.
  /// This value is incremented each time NextRow() is called.
  int row_pos_ = 0;

  /// Size of the row group's key buffers.
  /// Read from the row group header.
  int key_length_ = -1;

  /// Compressed size of the row group's key buffers.
  /// Read from the row group header.
  int compressed_key_length_ = -1;

  /// If true, the row_group_buffer_ can be reused across row groups, otherwise,
  /// it (more specifically the data_buffer_pool_ that allocated the row_group_buffer_)
  /// must be attached to the row batch.
  bool reuse_row_group_buffer_ = false;

  /// Buffer containing the entire row group.  We allocate a buffer for the entire
  /// row group, skipping non-materialized columns.
  uint8_t* row_group_buffer_ = nullptr;

  /// Sum of the bytes lengths of the materialized columns in the current row group.  This
  /// is the number of valid bytes in row_group_buffer_.
  int64_t row_group_length_ = 0;

  /// This is the allocated size of 'row_group_buffer_'.  'row_group_buffer_' is reused
  /// across row groups and will grow as necessary.
  int64_t row_group_buffer_size_ = 0;
};

}

#endif
